package server

import (
	"context"
	"github.com/go-kratos/kratos/v2/encoding"
	"github.com/go-kratos/kratos/v2/log"
	"github.com/tx7do/kratos-transport/broker"
	"github.com/tx7do/kratos-transport/transport/nsq"
	"kratos_kafka/internal/conf"
	"kratos_kafka/internal/service"
	"kratos_kafka/internal/utils/mq_nsq"
)

func NewNsqServer(c *conf.MessageQueue, logger log.Logger, service *service.GreeterService) *nsq.Server {
	nsqSrv := nsq.NewServer(
		nsq.WithAddress(c.Nsq.Server.Addr),
		nsq.WithCodec(encoding.GetCodec("json")),
	)

	// 注册一个消费者
	err1 := nsqSrv.RegisterSubscriber(
		c.Nsq.BookTopic, // topic
		mq_nsq.RegisterBookDataHandler(service.ReceiveMsgFromNSQ),
		mq_nsq.BookDataCreator,
		broker.WithQueueName(c.Nsq.BookChannel), // channel，相当于kafka的group
		// Notice 手动ACK的设置
		// Notice 加上这个配置的话，需要在处理的地方手动ACK：event.ACK()!!!
		//broker.DisableAutoAck(),
	)
	if err1 != nil {
		panic(err1)
	}

	// start
	if errStart := nsqSrv.Start(context.Background()); errStart != nil {
		log.Error("nsq server start error: %v", errStart)
		panic(errStart)
	}
	return nsqSrv
}
